home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/env python
-
- ## Printing troubleshooter
-
- ## Copyright (C) 2008, 2009 Red Hat, Inc.
- ## Copyright (C) 2008, 2009 Tim Waugh <twaugh@redhat.com>
-
- ## This program is free software; you can redistribute it and/or modify
- ## it under the terms of the GNU General Public License as published by
- ## the Free Software Foundation; either version 2 of the License, or
- ## (at your option) any later version.
-
- ## This program is distributed in the hope that it will be useful,
- ## but WITHOUT ANY WARRANTY; without even the implied warranty of
- ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- ## GNU General Public License for more details.
-
- ## You should have received a copy of the GNU General Public License
- ## along with this program; if not, write to the Free Software
- ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
- import cups
- import dbus
- import dbus.glib
- import gobject
- import os
- import pango
- import tempfile
- import time
- from timedops import TimedOperation, OperationCanceled
-
- from base import *
-
- import errordialogs
- errordialogs.set_gettext_function (_)
- from errordialogs import *
-
- DBUS_PATH="/com/redhat/PrinterSpooler"
- DBUS_IFACE="com.redhat.PrinterSpooler"
- class PrintTestPage(Question):
- STATE = { cups.IPP_JOB_PENDING: _("Pending"),
- cups.IPP_JOB_HELD: _("Held"),
- cups.IPP_JOB_PROCESSING: _("Processing"),
- cups.IPP_JOB_STOPPED: _("Stopped"),
- cups.IPP_JOB_CANCELED: _("Canceled"),
- cups.IPP_JOB_ABORTED: _("Aborted"),
- cups.IPP_JOB_COMPLETED: _("Completed") }
-
- def __init__ (self, troubleshooter):
- Question.__init__ (self, troubleshooter, "Print test page")
- page = gtk.VBox ()
- page.set_spacing (12)
- page.set_border_width (12)
-
- label = gtk.Label ()
- label.set_alignment (0, 0)
- label.set_use_markup (True)
- label.set_line_wrap (True)
- page.pack_start (label, False, False, 0)
- self.main_label = label
- self.main_label_text = ('<span weight="bold" size="larger">' +
- _("Test Page") + '</span>\n\n' +
- _("Now print a test page. If you are having "
- "problems printing a specific document, "
- "print that document now and mark the print "
- "job below."))
-
- hbox = gtk.HButtonBox ()
- hbox.set_border_width (0)
- hbox.set_spacing (3)
- hbox.set_layout (gtk.BUTTONBOX_START)
- self.print_button = gtk.Button (_("Print Test Page"))
- hbox.pack_start (self.print_button, False, False, 0)
-
- self.cancel_button = gtk.Button (_("Cancel All Jobs"))
- hbox.pack_start (self.cancel_button, False, False, 0)
- page.pack_start (hbox, False, False, 0)
-
- tv = gtk.TreeView ()
- test_cell = gtk.CellRendererToggle ()
- test = gtk.TreeViewColumn (_("Test"), test_cell, active=0)
- job = gtk.TreeViewColumn (_("Job"), gtk.CellRendererText (), text=1)
- printer_cell = gtk.CellRendererText ()
- printer = gtk.TreeViewColumn (_("Printer"), printer_cell, text=2)
- name_cell = gtk.CellRendererText ()
- name = gtk.TreeViewColumn (_("Document"), name_cell, text=3)
- status = gtk.TreeViewColumn (_("Status"), gtk.CellRendererText (),
- text=4)
- test_cell.set_radio (False)
- self.test_cell = test_cell
- printer.set_resizable (True)
- printer_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
- printer_cell.set_property ("width-chars", 20)
- name.set_resizable (True)
- name_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
- name_cell.set_property ("width-chars", 20)
- status.set_resizable (True)
- tv.append_column (test)
- tv.append_column (job)
- tv.append_column (printer)
- tv.append_column (name)
- tv.append_column (status)
- tv.set_rules_hint (True)
- sw = gtk.ScrolledWindow ()
- sw.set_policy (gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
- sw.set_shadow_type (gtk.SHADOW_IN)
- sw.add (tv)
- self.treeview = tv
- page.pack_start (sw)
-
- label = gtk.Label (_("Did the marked print jobs print correctly?"))
- label.set_line_wrap (True)
- label.set_alignment (0, 0)
- page.pack_start (label, False, False, 0)
-
- vbox = gtk.VBox ()
- vbox.set_spacing (6)
- self.yes = gtk.RadioButton (label=_("Yes"))
- no = gtk.RadioButton (label=_("No"))
- no.set_group (self.yes)
- vbox.pack_start (self.yes, False, False, 0)
- vbox.pack_start (no, False, False, 0)
- page.pack_start (vbox, False, False, 0)
- self.persistent_answers = {}
- troubleshooter.new_page (page, self)
-
- def display (self):
- answers = self.troubleshooter.answers
- if not answers.has_key ('cups_queue'):
- return False
-
- parent = self.troubleshooter.get_window ()
- self.authconn = answers['_authenticated_connection']
- mediatype = None
- defaults = answers.get ('cups_printer_ppd_defaults', {})
- for opts in defaults.values ():
- for opt, value in opts.iteritems ():
- if opt == "MediaType":
- mediatype = value
- break
-
- if mediatype != None:
- mediatype_string = '\n\n' + (_("Remember to load paper of type "
- "'%s' into the printer first.") %
- mediatype)
- else:
- mediatype_string = ""
-
- label_text = self.main_label_text + mediatype_string
- self.main_label.set_markup (label_text)
-
- model = gtk.ListStore (gobject.TYPE_BOOLEAN,
- gobject.TYPE_INT,
- gobject.TYPE_STRING,
- gobject.TYPE_STRING,
- gobject.TYPE_STRING)
- self.treeview.set_model (model)
- self.job_to_iter = {}
-
- test_jobs = self.persistent_answers.get ('test_page_job_id', [])
- def get_jobs ():
- c = self.authconn
- jobs_dict = c.getJobs (which_jobs='not-completed',
- my_jobs=False)
- completed_jobs_dict = c.getJobs (which_jobs='completed')
- return (jobs_dict, completed_jobs_dict)
-
- self.op = TimedOperation (get_jobs, parent=parent)
- try:
- (jobs_dict, completed_jobs_dict) = self.op.run ()
- except OperationCanceled:
- return False
-
- # We want to display the jobs in the queue for this printer...
- try:
- queue_uri_ending = "/" + self.troubleshooter.answers['cups_queue']
- jobs_on_this_printer = filter (lambda x:
- jobs_dict[x]['job-printer-uri'].\
- endswith (queue_uri_ending),
- jobs_dict.keys ())
- except:
- jobs_on_this_printer = []
-
- # ...as well as any other jobs we've previous submitted as test pages.
- jobs = list (set(test_jobs).union (set (jobs_on_this_printer)))
-
- completed_jobs_dict = None
- for job in jobs:
- try:
- j = jobs_dict[job]
- except KeyError:
- try:
- j = completed_jobs_dict[job]
- except KeyError:
- continue
-
- iter = model.append (None)
- self.job_to_iter[job] = iter
- model.set_value (iter, 0, job in test_jobs)
- model.set_value (iter, 1, job)
- self.update_job (job, j)
-
- return True
-
- def connect_signals (self, handler):
- self.print_sigid = self.print_button.connect ("clicked",
- self.print_clicked)
- self.cancel_sigid = self.cancel_button.connect ("clicked",
- self.cancel_clicked)
- self.test_sigid = self.test_cell.connect ('toggled',
- self.test_toggled)
-
- def create_subscription ():
- c = self.authconn
- sub_id = c.createSubscription ("/",
- events=["job-created",
- "job-completed",
- "job-stopped",
- "job-progress",
- "job-state-changed"])
- return sub_id
-
- parent = self.troubleshooter.get_window ()
- self.op = TimedOperation (create_subscription, parent=parent)
- try:
- self.sub_id = self.op.run ()
- except OperationCanceled:
- pass
-
- try:
- bus = dbus.SystemBus ()
- except:
- bus = None
-
- self.bus = bus
- if bus:
- bus.add_signal_receiver (self.handle_dbus_signal,
- path=DBUS_PATH,
- dbus_interface=DBUS_IFACE)
-
- self.timer = gobject.timeout_add (1000, self.update_jobs_list)
-
- def disconnect_signals (self):
- if self.bus:
- self.bus.remove_signal_receiver (self.handle_dbus_signal,
- path=DBUS_PATH,
- dbus_interface=DBUS_IFACE)
-
- self.print_button.disconnect (self.print_sigid)
- self.cancel_button.disconnect (self.cancel_sigid)
- self.test_cell.disconnect (self.test_sigid)
-
- def cancel_subscription (sub_id):
- c = self.authconn
- c.cancelSubscription (sub_id)
-
- parent = self.troubleshooter.get_window ()
- self.op = TimedOperation (cancel_subscription,
- (self.sub_id,),
- parent=parent)
- try:
- self.op.run ()
- except OperationCanceled:
- pass
-
- try:
- del self.sub_seq
- except:
- pass
-
- gobject.source_remove (self.timer)
-
- def collect_answer (self):
- if not self.displayed:
- return {}
-
- self.answers = self.persistent_answers.copy ()
- parent = self.troubleshooter.get_window ()
- success = self.yes.get_active ()
- self.answers['test_page_successful'] = success
-
- class collect_jobs:
- def __init__ (self, model):
- self.jobs = []
- model.foreach (self.each, None)
-
- def each (self, model, path, iter, user_data):
- self.jobs.append (model.get (iter, 0, 1, 2, 3, 4))
-
- model = self.treeview.get_model ()
- jobs = collect_jobs (model).jobs
- def collect_attributes (jobs):
- job_attrs = None
- c = self.authconn
- with_attrs = []
- for (test, jobid, printer, doc, status) in jobs:
- attrs = None
- if test:
- try:
- attrs = c.getJobAttributes (jobid)
- except AttributeError:
- # getJobAttributes was introduced in pycups 1.9.35.
- if job_attrs == None:
- job_attrs = c.getJobs (which_jobs='all')
-
- attrs = self.job_attrs[jobid]
-
- with_attrs.append ((test, jobid, printer, doc, status, attrs))
-
- return with_attrs
-
- self.op = TimedOperation (collect_attributes,
- (jobs,),
- parent=parent)
- try:
- with_attrs = self.op.run ()
- self.answers['test_page_job_status'] = with_attrs
- except OperationCanceled:
- pass
-
- return self.answers
-
- def cancel_operation (self):
- self.op.cancel ()
-
- # Abandon the CUPS connection and make another.
- answers = self.troubleshooter.answers
- factory = answers['_authenticated_connection_factory']
- self.authconn = factory.get_connection ()
- self.answers['_authenticated_connection'] = self.authconn
-
- def handle_dbus_signal (self, *args):
- debugprint ("D-Bus signal caught: updating jobs list soon")
- gobject.source_remove (self.timer)
- self.timer = gobject.timeout_add (200, self.update_jobs_list)
-
- def update_job (self, jobid, job_dict):
- iter = self.job_to_iter[jobid]
- model = self.treeview.get_model ()
- try:
- printer_name = job_dict['printer-name']
- except KeyError:
- try:
- uri = job_dict['job-printer-uri']
- r = uri.rfind ('/')
- printer_name = uri[r + 1:]
- except KeyError:
- printer_name = None
-
- if printer_name != None:
- model.set_value (iter, 2, printer_name)
-
- model.set_value (iter, 3, job_dict['job-name'])
- model.set_value (iter, 4, self.STATE[job_dict['job-state']])
-
- def print_clicked (self, widget):
- now = time.time ()
- tt = time.localtime (now)
- when = time.strftime ("%d/%b/%Y:%T %z", tt)
- self.persistent_answers['test_page_attempted'] = when
- answers = self.troubleshooter.answers
- parent = self.troubleshooter.get_window ()
-
- def print_test_page (*args, **kwargs):
- factory = answers['_authenticated_connection_factory']
- c = factory.get_connection ()
- return c.printTestPage (*args, **kwargs)
-
- tmpfname = None
- mimetypes = [None, 'text/plain']
- for mimetype in mimetypes:
- try:
- if mimetype == None:
- # Default test page.
- self.op = TimedOperation (print_test_page,
- (answers['cups_queue'],),
- parent=parent)
- jobid = self.op.run ()
- elif mimetype == 'text/plain':
- (tmpfd, tmpfname) = tempfile.mkstemp ()
- os.write (tmpfd, "This is a test page.\n")
- os.close (tmpfd)
- self.op = TimedOperation (print_test_page,
- (answers['cups_queue'],),
- kwargs={'file': tmpfname,
- 'format': mimetype},
- parent=parent)
- jobid = self.op.run ()
- try:
- os.unlink (tmpfname)
- except OSError:
- pass
-
- tmpfname = None
-
- jobs = self.persistent_answers.get ('test_page_job_id', [])
- jobs.append (jobid)
- self.persistent_answers['test_page_job_id'] = jobs
- break
- except OperationCanceled:
- self.persistent_answers['test_page_submit_failure'] = 'cancel'
- break
- except RuntimeError:
- self.persistent_answers['test_page_submit_failure'] = 'connect'
- break
- except cups.IPPError, (e, s):
- if (e == cups.IPP_DOCUMENT_FORMAT and
- mimetypes.index (mimetype) < (len (mimetypes) - 1)):
- # Try next format.
- if tmpfname != None:
- os.unlink (tmpfname)
- tmpfname = None
- continue
-
- self.persistent_answers['test_page_submit_failure'] = (e, s)
- show_error_dialog (_("Error submitting test page"),
- _("There was an error during the CUPS "
- "operation: '%s'.") % s,
- self.troubleshooter.get_window ())
- break
-
- def cancel_clicked (self, widget):
- self.persistent_answers['test_page_jobs_cancelled'] = True
- jobids = []
- for jobid, iter in self.job_to_iter.iteritems ():
- jobids.append (jobid)
-
- def cancel_jobs (jobids):
- c = self.authconn
- for jobid in jobids:
- try:
- c.cancelJob (jobid)
- except cups.IPPError, (e, s):
- if e != cups.IPP_NOT_POSSIBLE:
- self.persistent_answers['test_page_cancel_failure'] = (e, s)
-
- self.op = TimedOperation (cancel_jobs,
- (jobids,),
- parent=self.troubleshooter.get_window ())
- try:
- self.op.run ()
- except OperationCanceled:
- pass
-
- def test_toggled (self, cell, path):
- model = self.treeview.get_model ()
- iter = model.get_iter (path)
- active = model.get_value (iter, 0)
- model.set_value (iter, 0, not active)
-
- def update_jobs_list (self):
- def get_notifications (self):
- c = self.authconn
- try:
- notifications = c.getNotifications ([self.sub_id],
- [self.sub_seq + 1])
- except AttributeError:
- notifications = c.getNotifications ([self.sub_id])
-
- return notifications
-
- # Enter the GDK lock. We need to do this because we were
- # called from a timeout.
- gtk.gdk.threads_enter ()
-
- parent = self.troubleshooter.get_window ()
- self.op = TimedOperation (get_notifications,
- (self,),
- parent=parent)
- try:
- notifications = self.op.run ()
- except OperationCanceled:
- gtk.gdk.threads_leave ()
- return True
-
- answers = self.troubleshooter.answers
- model = self.treeview.get_model ()
- queue = answers['cups_queue']
- test_jobs = self.persistent_answers.get('test_page_job_id', [])
- for event in notifications['events']:
- seq = event['notify-sequence-number']
- try:
- if seq <= self.sub_seq:
- # Work around a bug in pycups < 1.9.34
- continue
- except AttributeError:
- pass
- self.sub_seq = seq
- job = event['notify-job-id']
-
- nse = event['notify-subscribed-event']
- if nse == 'job-created':
- if (job in test_jobs or
- event['printer-name'] == queue):
- iter = model.append (None)
- self.job_to_iter[job] = iter
- model.set_value (iter, 0, True)
- model.set_value (iter, 1, job)
- else:
- continue
- elif not self.job_to_iter.has_key (job):
- continue
-
- if (job in test_jobs and
- nse in ["job-stopped", "job-completed"]):
- comp = self.persistent_answers.get ('test_page_completions', [])
- comp.append ((job, event['notify-text']))
- self.persistent_answers['test_page_completions'] = comp
-
- self.update_job (job, event)
-
- # Update again when we're told to. (But we might update sooner if
- # there is a D-Bus signal.)
- gobject.source_remove (self.timer)
- self.timer = gobject.timeout_add (1000 *
- notifications['notify-get-interval'],
- self.update_jobs_list)
- debugprint ("Update again in %ds" %
- notifications['notify-get-interval'])
- gtk.gdk.threads_leave ()
- return False
-